//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// seq_scan_executor.cpp
//
// Identification: src/execution/seq_scan_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include "execution/executors/seq_scan_executor.h"

namespace bustub {

SeqScanExecutor::SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      table_iter_(exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->table_->MakeEagerIterator()) {}

void SeqScanExecutor::Init() {
  cursor_ = 0;
  res_.clear();
  auto oid = plan_->GetTableOid();
  auto txn = exec_ctx_->GetTransaction();
  auto lock_mgr = exec_ctx_->GetLockManager();

  if (GetExecutorContext()->IsDelete()) {
    // lock_mgr->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, oid);
  } else {
    LOG_DEBUG("check LockTable");
    if ((txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ ||
         txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) &&
        (!txn->IsTableIntentionExclusiveLocked(oid) && !txn->IsTableIntentionSharedLocked(oid))) {
      try {
        LOG_DEBUG("LockTable, holdmode");
        bool res = lock_mgr->LockTable(txn, LockManager::LockMode::INTENTION_SHARED, oid);
        if (!res) {
          throw ExecutionException("seq-scan");
        }
      } catch (TransactionAbortException) {
        throw ExecutionException("seq-scan");
      }
    }
  }
}

auto SeqScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
  // FIXME:Hint4不理解
  LOG_DEBUG("Next");
  auto oid = plan_->GetTableOid();
  auto txn = exec_ctx_->GetTransaction();
  auto lock_mgr = exec_ctx_->GetLockManager();

  while (!table_iter_.IsEnd()) {
    *rid = table_iter_.GetRID();

    if (GetExecutorContext()->IsDelete()) {
    } else {
      if ((txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ ||
           txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) &&
          !txn->IsRowExclusiveLocked(oid, *rid)) {
        try {
          bool res = lock_mgr->LockRow(txn, LockManager::LockMode::SHARED, oid, *rid);
          if (!res) {
            throw ExecutionException("seq-scan");
          }
        } catch (TransactionAbortException) {
          throw ExecutionException("seq-scan");
        }
      }
    }
    if (table_iter_.GetTuple().first.is_deleted_) {
      if (!GetExecutorContext()->IsDelete() && txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
        try {
          bool res = lock_mgr->UnlockRow(txn, oid, *rid);
          if (!res) {
            throw ExecutionException("seq-scan");
          }
        } catch (TransactionAbortException) {
          throw ExecutionException("seq-scan");
        }
      }
      ++table_iter_;
    } else {
      break;
    }
  }
  if (table_iter_.IsEnd()) {
    return false;
  }

  *tuple = table_iter_.GetTuple().second;
  *rid = table_iter_.GetRID();
  if (!GetExecutorContext()->IsDelete() && txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
    try {
      bool res = lock_mgr->UnlockRow(txn, oid, *rid);
      if (!res) {
        throw ExecutionException("seq-scan");
      }
    } catch (TransactionAbortException) {
      throw ExecutionException("seq-scan");
    }
  }
  ++table_iter_;
  return true;
}

}  // namespace bustub
